Skip to content

后端 socket.io

注意事项

shell
# 关于 io 实例,创建连接和创建命名空间的概念不要混淆
const socket = io("http://localhost:3000/chat");
以上代码的作用有两个功能:
    前提是 需要判断是否已经建立 socket 连接
        未连接的情况下:一是创建 http://localhost:3000 连接通道,二是创建 /chat 命名空间
        已连接的情况下:则省略一,只进入二
    后面还可以写多个 io 实例代码:const game= io('/game')  
        后续的实例不会再创建连接,在已有的连接基础上创建一个子通道,只是创建对应的命名空间
如果不写域名,直接写 const game= io('/game')  且只有这一个实例
    那么,Socket.IO 会自动补全域名和端口,默认连接当前网站的域名 + 当前端口,再创建一个 socket 连接,再创建一个 /game 命名空间
域名/端口是否相同还有不同的说法
    域名 / 端口不同 新建连接
        io('a.com')
        io('b.com')
 两条连接
    域名 / 端口相同 复用连接
        io('a.com')
        io('a.com/game')
 同一条连接,不同命名空间
命名空间 房间
    命名空间:连接上的子通道
    房间:命名空间里的小组

# 总结
1. 不同域名 = 不同连接
2. 相同域名 = 复用连接 + 多命名空间
3. io('/game') 不写域名 = 连接当前网站
4. 连接是物理通道
5. 命名空间是逻辑通道
6. 房间是命名空间下的小组

核心 API

shell
Socket.IO 服务端 API 围绕三个核心对象展开:Server(顶层服务器实例)、Namespace(命名空间,用于逻辑隔离)和 Socket(单个客户端连接)。
与客户端 API 相比,服务端多了广播、房间管理、跨服务器通信等面向多客户端的管理能力。

# Server 实例
Server 实例(代码中常命名为 io)是 Socket.IO 服务端的入口,负责管理所有连接和命名空间。
初始化和配置方法
    new Server(port, options) 创建服务器实例,可传入端口、HTTP 服务器或直接传入 options
    io.serveClient(value) 设置是否自动托管客户端静态文件,默认 true。生产环境可设为 false
    io.path(value) 设置 WebSocket 服务的路径,默认 /socket.io/,需与客户端保持一致
    io.adapter(value) 设置适配器,默认使用内存适配器。多服务器部署时需使用 Redis 等共享适配器
    io.attach(target, options) 将 Socket.IO 服务器附加到已有的 HTTP 服务器或端口上
    io.bind(engine) 高级用法:绑定到自定义的 Engine.IO 实例
核心管理方法(Cluster 友好)
    io.socketsJoin(rooms) 让匹配的 Socket 加入指定房间
    io.socketsLeave(rooms) 让匹配的 Socket 离开指定房间
    io.disconnectSockets(close) 断开匹配的 Socket 连接
    io.fetchSockets() 异步获取匹配的 Socket 实例数组(Promise)
    io.serverSideEmit(event, args, ack) 向集群中的其他 Socket.IO 服务器发送事件(v4.1.0+)
事件发射方法
    io.emit(event, ...args) 向所有连接的客户端广播事件
    io.emitWithAck(event, ...args) 广播事件并等待所有客户端的确认(Promise 版,v4.6.0+)
    io.to(room).emit(event, ...args) 只向指定房间内的客户端发送事件
    io.except(room).emit(event, ...args) 向除了指定房间以外的所有客户端发送事件
    io.timeout(timeout).emit(event, ack) 带超时的广播,等待所有客户端确认
系统事件监听
    connection / connect 有新客户端连接时触发,参数为 Socket 实例
    new_namespace 新命名空间被创建时触发
底层访问属性
    io.sockets 主命名空间的别名(/),等同于 io.of('/')
    io.engine 底层 Engine.IO 服务器的引用,可访问 clientsCount、监听 headers 事件等
    io.of(nsp) 获取或创建指定路径的命名空间
    io.close(callback) 关闭服务器并断开所有客户端连接

# Namespace
Namespace 是一组 Socket 的逻辑分组,用于隔离不同业务模块(如 /chat /game)
属性/方法
    namespace.name 命名空间的路径标识符
    namespace.sockets Map 结构,包含该命名空间下所有 Socket 实例
    namespace.adapter 该命名空间使用的适配器实例
    namespace.to(room) 返回 BroadcastOperator,用于向指定房间广播
    namespace.emit(event, ...args) 向该命名空间下所有客户端广播
    namespace.use(fn) 为该命名空间添加中间件,在每次连接前执行
命名空间创建示例
    const adminNamespace = io.of('/admin');
    const dynamicNsp = io.of(/^\/dynamic-\d+$/);  # 支持正则表达式动态创建

# 客户端连接 API:Socket 实例
方法
    socket.emit(event, ...args, [ack]) 向该客户端发送事件,可带确认回调
    socket.emitWithAck(event, ...args) Promise 版的带确认发送(v4.6.0+)
    socket.on(event, listener) 监听客户端发来的事件
    socket.once(event, listener) 单次监听
    socket.off(event, listener) 移除事件监听
    socket.compress(value) 设置下一个发送的数据包是否压缩
广播与房间方法
    socket.join(room) 让当前 Socket 加入指定房间
    socket.leave(room) 让当前 Socket 离开指定房间
    socket.to(room) 向该房间内除自己以外的客户端广播(链式调用)
    socket.broadcast 向除自己以外的所有客户端广播
    socket.broadcast.to(room) 向房间内除自己以外的客户端广播
属性和状态
    socket.id 当前 Socket 的唯一标识符
    socket.rooms Set 结构,包含当前 Socket 所在的所有房间
    socket.handshake 握手信息,包含 headers、query、auth
    socket.connected 布尔值,表示连接状态
    socket.data 可共享的任意数据对象(支持跨服务器同步,v4.4.0+)
    socket.disconnect(close) 主动断开连接
辅助类型:BroadcastOperator
    operator.to(room) 增加目标房间
    operator.except(room) 排除指定房间
    operator.local 仅影响当前服务器节点(在多服务器集群中)
    operator.timeout(time) 设置广播的超时时间
    operator.emit(event, ...args) 执行广播
    operator.emitWithAck(event, ...args) Promise 版广播
    operator.fetchSockets() 获取匹配的 Socket 实例数组
    operator.disconnectSockets(close) 断开匹配的 Socket
    operator.socketsJoin(rooms) 让匹配的 Socket 加入房间
    operator.socketsLeave(rooms) 让匹配的 Socket 离开房间
常用配置选项: 创建服务器时可传入的 options 对象
    选项 类型 默认值 描述
    path String /socket.io 服务路径
    serveClient Boolean true 是否托管客户端文件
    adapter Adapter 内存适配器 多服务器部署时需要使用 Redis 适配器
    cors Object {} CORS 配置
    pingTimeout Number 5000 等待 pong 包的超时时间(ms)
    pingInterval Number 25000 发送 ping 包的间隔(ms)
    transports Array ['polling', 'websocket'] 传输方式及顺序
    allowEIO3 Boolean false 是否允许兼容 EIO3 客户端
    connectionStateRecovery Object - 连接状态恢复配置(v4.6.0+)

简单场景

1、基础连接与广播

js
io.on('connection', (socket) => {
  console.log('新用户连接:', socket.id);
  socket.on('chat message', (msg) => {
    io.emit('chat message', msg);  // 广播给所有人
  });
});

2、房间管理(聊天室)

js
io.on('connection', (socket) => {
  socket.on('join room', (roomName) => {
    socket.join(roomName);
    io.to(roomName).emit('user joined', socket.id);
  });
  socket.on('room message', (data) => {
    socket.to(data.room).emit('message', data.text);  // 不发给发送者
  });
});

3、带确认的消息(订单系统)

js
io.on('connection', (socket) => {
  socket.on('create order', (orderData, callback) => {
    // 处理订单...
    if (success) {
      callback({ status: 'ok', orderId: '123' });
    } else {
      callback({ status: 'error', error: '库存不足' });
    }
  });
});

4、命名空间隔离

js
const adminIo = io.of('/admin');
adminIo.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (isValidToken(token)) return next();
  next(new Error('unauthorized'));
});

adminIo.on('connection', (socket) => {
  socket.on('admin action', (data) => {
    adminIo.emit('admin update', data);  // 只发给 admin 命名空间的客户端
  });
});

5、中间件(认证)

js
io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (verifyToken(token)) {
    socket.data.userId = decodeToken(token).userId;
    next();
  } else {
    next(new Error('Authentication error'));
  }
});

io.on('connection', (socket) => {
  console.log('认证用户:', socket.data.userId);
});

6、多服务器集群操作

js
// 使用 Redis 适配器共享状态
const { createAdapter } = require('@socket.io/redis-adapter');
const redisClient = require('redis').createClient();
io.adapter(createAdapter(redisClient, redisClient.duplicate()));

// 跨服务器获取所有在线用户
const allSockets = await io.fetchSockets();
console.log(`全网在线人数: ${allSockets.length}`);

复杂场景

1、实时协作编辑

适用于在线文档、代码协作、白板等多人实时编辑场景。

js
// 服务端
const documents = new Map(); // docId -> { content, version, users }

io.on('connection', (socket) => {
  let currentDocId = null;
  
  socket.on('join document', (docId, callback) => {
    currentDocId = docId;
    socket.join(`doc:${docId}`);
    
    if (!documents.has(docId)) {
      documents.set(docId, { content: '', version: 0, users: new Set() });
    }
    
    const doc = documents.get(docId);
    doc.users.add(socket.id);
    
    // 发送当前文档内容和用户列表
    callback({
      content: doc.content,
      version: doc.version,
      users: Array.from(doc.users)
    });
    
    // 通知其他人有新用户加入
    socket.to(`doc:${docId}`).emit('user joined', socket.id);
  });
  
  socket.on('edit operation', (ops, version, callback) => {
    const doc = documents.get(currentDocId);
    if (!doc) return;
    
    // 乐观锁检查:如果客户端版本落后,需要进行冲突处理
    if (version !== doc.version) {
      callback({ 
        status: 'conflict', 
        currentContent: doc.content,
        currentVersion: doc.version 
      });
      return;
    }
    
    // 应用操作(这里简化了 OT 或 CRDT 的实际逻辑)
    doc.content = applyOperation(doc.content, ops);
    doc.version++;
    
    // 广播给房间内其他用户
    socket.to(`doc:${currentDocId}`).emit('operation', ops, doc.version, socket.id);
    callback({ status: 'success', version: doc.version });
  });
  
  socket.on('cursor move', (position) => {
    socket.to(`doc:${currentDocId}`).emit('cursor update', socket.id, position);
  });
  
  socket.on('disconnect', () => {
    if (currentDocId) {
      const doc = documents.get(currentDocId);
      if (doc) {
        doc.users.delete(socket.id);
        socket.to(`doc:${currentDocId}`).emit('user left', socket.id);
      }
    }
  });
});

// 客户端实现 OT/CRDT 算法示例
function applyOperation(content, ops) {
  // 实现操作转换(Operational Transformation)
  // 这里简化处理
  return ops.reduce((text, op) => {
    if (op.type === 'insert') {
      return text.slice(0, op.position) + op.text + text.slice(op.position);
    } else if (op.type === 'delete') {
      return text.slice(0, op.position) + text.slice(op.position + op.length);
    }
    return text;
  }, content);
}

2、实时数据监控仪表板(DevOps/物联网)

适用于服务器监控、生产线监控、智能家居控制中心等。

js
// 服务端
class MetricsCollector {
  constructor(io) {
    this.io = io;
    this.metrics = {
      cpu: [],
      memory: [],
      requests: 0,
      errors: 0
    };
    this.thresholds = {
      cpu: 80,    // CPU 超过 80% 报警
      memory: 90   // 内存超过 90% 报警
    };
    
    this.startMonitoring();
  }
  
  startMonitoring() {
    setInterval(() => {
      const currentMetrics = this.collectMetrics();
      this.metrics.cpu.push(currentMetrics.cpu);
      this.metrics.memory.push(currentMetrics.memory);
      
      // 保留最近 60 个数据点
      if (this.metrics.cpu.length > 60) this.metrics.cpu.shift();
      if (this.metrics.memory.length > 60) this.metrics.memory.shift();
      
      // 主动推送实时数据
      this.io.emit('metrics update', {
        cpu: currentMetrics.cpu,
        memory: currentMetrics.memory,
        timestamp: Date.now(),
        uptime: process.uptime()
      });
      
      // 警报检测
      if (currentMetrics.cpu > this.thresholds.cpu) {
        this.io.emit('alert', {
          type: 'HIGH_CPU',
          value: currentMetrics.cpu,
          threshold: this.thresholds.cpu
        });
      }
    }, 2000); // 每 2 秒采集一次
  }
  
  collectMetrics() {
    return {
      cpu: Math.random() * 100, // 实际应用中应使用 os 模块
      memory: process.memoryUsage().heapUsed / 1024 / 1024
    };
  }
  
  // 支持按需订阅特定指标
  setupSubscriptions() {
    this.io.on('connection', (socket) => {
      const subscriptions = new Set();
      
      socket.on('subscribe metric', (metricName) => {
        subscriptions.add(metricName);
        socket.emit('subscribed', metricName);
      });
      
      socket.on('unsubscribe metric', (metricName) => {
        subscriptions.delete(metricName);
      });
      
      // 自定义推送频率
      let interval;
      socket.on('set polling rate', (rate) => {
        if (interval) clearInterval(interval);
        interval = setInterval(() => {
          if (subscriptions.size > 0) {
            const filteredData = {};
            subscriptions.forEach(metric => {
              filteredData[metric] = this.metrics[metric]?.slice(-1)[0];
            });
            socket.emit('custom metrics', filteredData);
          }
        }, rate);
      });
    });
  }
}

// 物联网设备接入
io.on('connection', (socket) => {
  let deviceId = null;
  
  socket.on('device register', (deviceInfo, callback) => {
    deviceId = deviceInfo.id;
    socket.join(`device:${deviceId}`);
    
    // 注册设备到数据库
    registerDevice(deviceInfo);
    callback({ status: 'ok', message: 'Device registered' });
    
    // 通知控制台新设备上线
    io.emit('device online', deviceInfo);
  });
  
  socket.on('telemetry', (data) => {
    // 存储时序数据(如 InfluxDB)
    storeTelemetry(deviceId, {
      temperature: data.temp,
      humidity: data.humidity,
      timestamp: Date.now()
    });
    
    // 实时转发给订阅该设备的客户端
    io.to(`device:${deviceId}`).emit('telemetry update', data);
    
    // 阈值检测
    if (data.temp > 40) {
      io.emit('device alert', {
        deviceId,
        type: 'OVERHEAT',
        value: data.temp
      });
    }
  });
  
  socket.on('control device', (command, callback) => {
    // 发送控制指令到设备
    socket.to(`device:${command.deviceId}`).emit('command', command);
    callback({ status: 'sent', timestamp: Date.now() });
  });
});

3、实时投票/竞拍系统

适用于直播投票、在线拍卖、抢答系统等。

js
// 服务端
class AuctionSystem {
  constructor(io) {
    this.io = io;
    this.auctions = new Map(); // auctionId -> auction data
    this.bids = new Map();     // auctionId -> bids array
  }
  
  createAuction(auctionData) {
    const auction = {
      id: generateId(),
      item: auctionData.item,
      startingPrice: auctionData.startingPrice,
      currentPrice: auctionData.startingPrice,
      endTime: Date.now() + auctionData.duration,
      status: 'active',
      winner: null
    };
    
    this.auctions.set(auction.id, auction);
    this.bids.set(auction.id, []);
    
    // 创建专用房间
    const room = `auction:${auction.id}`;
    
    // 开始倒计时广播
    const countdownInterval = setInterval(() => {
      const now = Date.now();
      const timeLeft = auction.endTime - now;
      
      if (timeLeft <= 0) {
        clearInterval(countdownInterval);
        auction.status = 'ended';
        this.io.to(room).emit('auction ended', {
          winner: auction.winner,
          finalPrice: auction.currentPrice
        });
        return;
      }
      
      this.io.to(room).emit('countdown', timeLeft);
    }, 1000);
    
    return auction;
  }
  
  setupHandlers() {
    this.io.on('connection', (socket) => {
      // 加入拍卖
      socket.on('join auction', (auctionId) => {
        const auction = this.auctions.get(auctionId);
        if (auction && auction.status === 'active') {
          socket.join(`auction:${auctionId}`);
          socket.emit('auction state', {
            currentPrice: auction.currentPrice,
            endTime: auction.endTime,
            bidHistory: this.bids.get(auctionId).slice(-10)
          });
        }
      });
      
      // 出价
      socket.on('place bid', (auctionId, amount, callback) => {
        const auction = this.auctions.get(auctionId);
        
        // 验证出价
        if (!auction || auction.status !== 'active') {
          return callback({ error: 'Auction not active' });
        }
        
        if (Date.now() > auction.endTime) {
          return callback({ error: 'Auction ended' });
        }
        
        if (amount <= auction.currentPrice) {
          return callback({ error: 'Bid too low' });
        }
        
        // 更新价格
        const oldPrice = auction.currentPrice;
        auction.currentPrice = amount;
        auction.winner = socket.id;
        
        // 记录出价
        const bid = {
          bidder: socket.id,
          amount: amount,
          timestamp: Date.now()
        };
        this.bids.get(auctionId).push(bid);
        
        // 广播新出价
        this.io.to(`auction:${auctionId}`).emit('new bid', {
          bidder: socket.id,
          amount: amount,
          oldPrice: oldPrice
        });
        
        callback({ success: true, currentPrice: amount });
      });
      
      // 立即购买(Buy It Now)
      socket.on('buy now', (auctionId, callback) => {
        const auction = this.auctions.get(auctionId);
        if (auction && auction.buyItNowPrice) {
          auction.status = 'ended';
          auction.winner = socket.id;
          auction.currentPrice = auction.buyItNowPrice;
          
          this.io.to(`auction:${auctionId}`).emit('buy now executed', {
            buyer: socket.id,
            price: auction.buyItNowPrice
          });
          
          callback({ success: true });
        }
      });
    });
  }
}

// 投票系统示例
class VotingSystem {
  constructor(io, optionsId) {
    this.io = io;
    this.options = optionsId;
    this.votes = new Map(); // optionId -> count
    this.voters = new Set(); // 防止重复投票
    
    optionsId.forEach(opt => this.votes.set(opt.id, 0));
  }
  
  setup() {
    this.io.on('connection', (socket) => {
      // 实时投票
      socket.on('vote', (optionId, voterId, callback) => {
        if (this.voters.has(voterId)) {
          return callback({ error: 'Already voted' });
        }
        
        if (!this.votes.has(optionId)) {
          return callback({ error: 'Invalid option' });
        }
        
        // 更新票数
        const newCount = this.votes.get(optionId) + 1;
        this.votes.set(optionId, newCount);
        this.voters.add(voterId);
        
        // 实时广播更新
        const results = Array.from(this.votes.entries()).map(([id, count]) => ({
          id,
          count,
          percentage: (count / this.voters.size) * 100
        }));
        
        this.io.emit('vote update', {
          optionId,
          newCount,
          results,
          totalVotes: this.voters.size
        });
        
        callback({ success: true, results });
      });
      
      // 获取实时结果
      socket.on('get results', () => {
        const results = Array.from(this.votes.entries()).map(([id, count]) => ({
          id,
          count,
          percentage: this.voters.size ? (count / this.voters.size) * 100 : 0
        }));
        
        socket.emit('results', {
          results,
          totalVotes: this.voters.size
        });
      });
    });
  }
}

4、实时游戏对战(帧同步/状态同步)

适用于 FPS、MOBA、赛车等实时竞技游戏。

js
// 服务端 - 帧同步架构
class GameRoom {
  constructor(roomId, io) {
    this.roomId = roomId;
    this.io = io;
    this.players = new Map(); // userId -> { ready, inputs, lastProcessedFrame }
    this.gameState = null;
    this.currentFrame = 0;
    this.frameInterval = 50; // 50ms 一帧 (20FPS)
    this.inputBuffer = []; // 存储所有玩家的输入
    this.room = `game:${roomId}`;
  }
  
  start() {
    // 游戏主循环
    this.gameLoop = setInterval(() => {
      this.updateFrame();
    }, this.frameInterval);
  }
  
  updateFrame() {
    this.currentFrame++;
    
    // 收集这一帧所有玩家的输入
    const frameInputs = [];
    for (let [userId, player] of this.players) {
      const input = player.inputs.get(this.currentFrame);
      if (input) {
        frameInputs.push({ userId, input });
        player.inputs.delete(this.currentFrame);
      }
    }
    
    if (frameInputs.length > 0 || this.currentFrame % 10 === 0) {
      // 更新游戏状态(这里简化处理)
      this.gameState = this.calculateNextState(this.gameState, frameInputs);
      
      // 广播帧数据
      this.io.to(this.room).emit('game frame', {
        frame: this.currentFrame,
        state: this.gameState,
        inputs: frameInputs
      });
    }
  }
  
  calculateNextState(currentState, inputs) {
    // 实现游戏逻辑
    // 这里用简单示例替代
    return {
      ...currentState,
      timestamp: Date.now(),
      frame: this.currentFrame
    };
  }
  
  setupSocket(socket) {
    socket.join(this.room);
    
    socket.on('player input', ({ frame, input }) => {
      const player = this.players.get(socket.id);
      if (player) {
        // 存储输入,等待相应帧处理
        player.inputs.set(frame, input);
        
        // 确认收到输入(用于客户端预测)
        socket.emit('input acknowledged', { frame });
      }
    });
    
    socket.on('game sync request', (lastFrame) => {
      // 客户端请求重同步(用于断线重连)
      socket.emit('game sync', {
        currentFrame: this.currentFrame,
        gameState: this.gameState,
        pendingInputs: this.getPendingInputs(lastFrame)
      });
    });
    
    socket.on('disconnect', () => {
      this.players.delete(socket.id);
      this.io.to(this.room).emit('player left', socket.id);
      
      if (this.players.size === 0) {
        clearInterval(this.gameLoop);
      }
    });
  }
}

// 服务端 - 状态同步架构(适合卡牌、回合制游戏)
class StateSyncGame {
  constructor(roomId, io) {
    this.roomId = roomId;
    this.io = io;
    this.gameState = {
      players: new Map(),
      currentTurn: null,
      turnTimeLeft: 30000, // 30秒回合时间
      winner: null
    };
    this.actionHistory = [];
  }
  
  setup() {
    this.timer = setInterval(() => {
      if (this.gameState.currentTurn && this.gameState.turnTimeLeft > 0) {
        this.gameState.turnTimeLeft -= 1000;
        this.broadcastState();
        
        if (this.gameState.turnTimeLeft <= 0) {
          this.nextTurn();
        }
      }
    }, 1000);
  }
  
  handleAction(socket, action) {
    // 验证行动的合法性
    if (!this.validateAction(socket.id, action)) {
      socket.emit('action invalid', { reason: 'Invalid action' });
      return;
    }
    
    // 应用行动到游戏状态
    const result = this.applyAction(socket.id, action);
    
    // 记录行动历史
    this.actionHistory.push({
      playerId: socket.id,
      action,
      result,
      timestamp: Date.now()
    });
    
    // 广播行动结果给所有玩家
    this.io.to(this.roomId).emit('action executed', {
      playerId: socket.id,
      action,
      result,
      newState: this.gameState
    });
    
    // 检查游戏结束
    if (this.checkGameOver()) {
      this.io.to(this.roomId).emit('game over', { winner: this.gameState.winner });
      clearInterval(this.timer);
    } else {
      this.nextTurn();
    }
  }
  
  broadcastState() {
    // 只发送每个玩家能看到的信息(防止作弊)
    this.io.to(this.roomId).emit('game state', this.getVisibleState());
  }
  
  getVisibleState() {
    // 返回公开的游戏状态
    return {
      currentTurn: this.gameState.currentTurn,
      turnTimeLeft: this.gameState.turnTimeLeft,
      visibleCards: this.gameState.visibleCards,
      // 隐藏其他玩家的手牌等
    };
  }
}

5、实时直播弹幕/礼物系统

适用于直播平台、在线教育、虚拟活动等。

js
// 服务端
class LiveStreamSystem {
  constructor(io) {
    this.io = io;
    this.rooms = new Map(); // roomId -> { viewers, danmaku, gifts, settings }
    this.bannedUsers = new Set();
  }
  
  createRoom(roomId, settings) {
    this.rooms.set(roomId, {
      viewers: new Map(),
      danmaku: [],
      gifts: [],
      settings: {
        slowMode: false,    // 慢速模式(限制弹幕频率)
        emojiOnly: false,   // 仅限表情
        minLevel: 0,        // 最低等级要求
        ...settings
      },
      lastMessageTime: new Map()
    });
  }
  
  setupHandlers() {
    this.io.on('connection', (socket) => {
      let currentRoom = null;
      let userInfo = null;
      
      socket.on('join room', (roomId, user, callback) => {
        currentRoom = roomId;
        userInfo = user;
        
        const room = this.rooms.get(roomId);
        if (!room) {
          return callback({ error: 'Room not found' });
        }
        
        // 权限检查
        if (user.level < room.settings.minLevel) {
          return callback({ error: 'Level too low' });
        }
        
        socket.join(roomId);
        room.viewers.set(socket.id, user);
        
        // 发送最近的历史弹幕(最近100条)
        callback({
          status: 'ok',
          history: room.danmaku.slice(-100),
          viewers: room.viewers.size,
          settings: room.settings
        });
        
        // 通知房间人数更新
        this.io.to(roomId).emit('viewer count', room.viewers.size);
      });
      
      // 发送弹幕
      socket.on('danmaku', async (content, callback) => {
        const room = this.rooms.get(currentRoom);
        if (!room) return callback({ error: 'Not in room' });
        
        // 检查是否被禁言
        if (this.bannedUsers.has(socket.id)) {
          return callback({ error: 'Banned' });
        }
        
        // 慢速模式检查
        if (room.settings.slowMode) {
          const lastTime = room.lastMessageTime.get(socket.id) || 0;
          const now = Date.now();
          if (now - lastTime < 3000) { // 3秒限制
            return callback({ error: 'Slow mode: please wait' });
          }
          room.lastMessageTime.set(socket.id, now);
        }
        
        // 内容过滤(敏感词)
        const filtered = await this.filterContent(content);
        if (!filtered.valid) {
          return callback({ error: 'Content blocked', reason: filtered.reason });
        }
        
        const danmaku = {
          id: generateId(),
          userId: userInfo.id,
          userName: userInfo.name,
          content: filtered.text,
          timestamp: Date.now(),
          color: userInfo.color,
          level: userInfo.level
        };
        
        room.danmaku.push(danmaku);
        
        // 保留最近1000条
        if (room.danmaku.length > 1000) {
          room.danmaku.shift();
        }
        
        // 广播给房间内所有人
        this.io.to(currentRoom).emit('danmaku', danmaku);
        
        // 如果是高等级用户,可以发全局弹幕
        if (userInfo.level >= 30) {
          this.io.emit('global danmaku', danmaku);
        }
        
        callback({ success: true, id: danmaku.id });
      });
      
      // 发送礼物
      socket.on('send gift', (giftData, callback) => {
        const room = this.rooms.get(currentRoom);
        if (!room) return;
        
        const gift = {
          id: generateId(),
          userId: userInfo.id,
          userName: userInfo.name,
          giftId: giftData.giftId,
          giftName: giftData.name,
          count: giftData.count,
          totalValue: giftData.price * giftData.count,
          timestamp: Date.now()
        };
        
        room.gifts.push(gift);
        
        // 特效礼物(全屏动画)
        const isEffectGift = ['rocket', 'yacht', 'castle'].includes(giftData.giftId);
        
        if (isEffectGift) {
          // 全局广播特效
          this.io.emit('gift effect', {
            userId: userInfo.id,
            userName: userInfo.name,
            gift: giftData,
            roomId: currentRoom
          });
        }
        
        // 房间内广播
        this.io.to(currentRoom).emit('gift', gift);
        
        // 更新礼物排行榜
        this.updateGiftRanking(userInfo.id, gift.totalValue);
        
        callback({ success: true });
      });
      
      socket.on('disconnect', () => {
        if (currentRoom) {
          const room = this.rooms.get(currentRoom);
          if (room) {
            room.viewers.delete(socket.id);
            this.io.to(currentRoom).emit('viewer count', room.viewers.size);
          }
        }
      });
    });
  }
  
  async filterContent(content) {
    // 集成内容审核API(如阿里云、腾讯云)
    // 这里简化实现
    const sensitiveWords = ['敏感词1', '敏感词2'];
    for (const word of sensitiveWords) {
      if (content.includes(word)) {
        return { valid: false, reason: 'Sensitive word' };
      }
    }
    return { valid: true, text: content };
  }
  
  updateGiftRanking(userId, value) {
    // 更新Redis中的排行榜
    // redis.zincrby('gift:ranking', value, userId);
  }
}

6、实时日志流(Tail -f 风格)

适用于日志查看、DevOps调试、实时错误监控等。

js
// 服务端
class LogStreamService {
  constructor(io, logFile) {
    this.io = io;
    this.logFile = logFile;
    this.tailProcess = null;
    this.filters = new Map(); // socketId -> filter rules
    this.maxBufferSize = 10000;
    this.logBuffer = [];
  }
  
  startTailing() {
    const fs = require('fs');
    const { spawn } = require('child_process');
    
    // 使用 tail -f 命令
    this.tailProcess = spawn('tail', ['-f', this.logFile]);
    
    this.tailProcess.stdout.on('data', (data) => {
      const lines = data.toString().split('\n');
      
      for (const line of lines) {
        if (line.trim()) {
          this.processLogLine(line);
        }
      }
    });
    
    this.tailProcess.stderr.on('data', (data) => {
      console.error(`Tail error: ${data}`);
    });
  }
  
  processLogLine(line) {
    // 解析日志(JSON格式或自定义格式)
    let logEntry;
    try {
      logEntry = JSON.parse(line);
    } catch {
      logEntry = {
        raw: line,
        timestamp: Date.now(),
        level: 'INFO'
      };
    }
    
    // 添加到缓冲
    this.logBuffer.push(logEntry);
    if (this.logBuffer.length > this.maxBufferSize) {
      this.logBuffer.shift();
    }
    
    // 根据过滤规则分发给客户端
    for (const [socketId, filter] of this.filters) {
      const socket = this.io.sockets.sockets.get(socketId);
      if (socket && this.matchesFilter(logEntry, filter)) {
        socket.emit('log line', logEntry);
      }
    }
  }
  
  matchesFilter(logEntry, filter) {
    if (!filter) return true;
    
    // 级别过滤
    if (filter.level && logEntry.level !== filter.level) return false;
    
    // 关键词过滤
    if (filter.keyword) {
      const text = JSON.stringify(logEntry);
      if (!text.toLowerCase().includes(filter.keyword.toLowerCase())) return false;
    }
    
    // 正则过滤
    if (filter.regex) {
      const regex = new RegExp(filter.regex, 'i');
      if (!regex.test(JSON.stringify(logEntry))) return false;
    }
    
    return true;
  }
  
  setupSocket(socket) {
    // 初始化过滤规则
    this.filters.set(socket.id, null);
    
    // 设置过滤条件
    socket.on('set filter', (filter) => {
      this.filters.set(socket.id, filter);
      socket.emit('filter applied', filter);
    });
    
    // 获取历史日志
    socket.on('get history', (count = 100) => {
      const history = this.logBuffer.slice(-count);
      socket.emit('log history', history);
    });
    
    // 实时搜索
    socket.on('search logs', async (query) => {
      // 使用子进程执行 grep
      const { exec } = require('child_process');
      exec(`grep "${query}" ${this.logFile} | tail -100`, (error, stdout) => {
        if (error) {
          socket.emit('search results', { error: 'No results' });
        } else {
          const results = stdout.split('\n').filter(l => l.trim());
          socket.emit('search results', { results, count: results.length });
        }
      });
    });
    
    // 统计信息
    socket.on('get stats', () => {
      // 计算各日志级别的数量
      const stats = {
        total: this.logBuffer.length,
        levels: {}
      };
      
      for (const log of this.logBuffer) {
        const level = log.level || 'INFO';
        stats.levels[level] = (stats.levels[level] || 0) + 1;
      }
      
      socket.emit('log stats', stats);
    });
    
    socket.on('disconnect', () => {
      this.filters.delete(socket.id);
    });
  }
}

// 使用示例
const logStream = new LogStreamService(io, '/var/log/app.log');
logStream.startTailing();

io.on('connection', (socket) => {
  logStream.setupSocket(socket);
});

7、WebRTC 信令服务器

适用于视频会议、屏幕共享、P2P 文件传输等。

js
// 服务端 - WebRTC 信令
class WebRTCSignaling {
  constructor(io) {
    this.io = io;
    this.rooms = new Map(); // roomId -> { users, peerConnections }
  }
  
  setup() {
    this.io.on('connection', (socket) => {
      let currentRoom = null;
      let userId = null;
      
      socket.on('join call', (roomId, user, callback) => {
        currentRoom = roomId;
        userId = user.id;
        
        if (!this.rooms.has(roomId)) {
          this.rooms.set(roomId, {
            users: new Map(),
            host: userId
          });
        }
        
        const room = this.rooms.get(roomId);
        
        // 检查房间人数限制
        if (room.users.size >= 8) {
          return callback({ error: 'Room is full' });
        }
        
        socket.join(roomId);
        room.users.set(userId, {
          id: userId,
          name: user.name,
          socketId: socket.id,
          streamEnabled: false,
          micMuted: false,
          cameraOff: false
        });
        
        // 获取房间内其他用户信息
        const otherUsers = Array.from(room.users.values())
          .filter(u => u.id !== userId);
        
        callback({
          status: 'ok',
          users: otherUsers,
          isHost: userId === room.host
        });
        
        // 通知其他人新用户加入
        socket.to(roomId).emit('user joined', {
          id: userId,
          name: user.name
        });
      });
      
      // WebRTC Offer
      socket.on('offer', ({ to, offer }) => {
        const toSocket = this.getSocketById(to);
        if (toSocket) {
          toSocket.emit('offer', {
            from: userId,
            offer: offer
          });
        }
      });
      
      // WebRTC Answer
      socket.on('answer', ({ to, answer }) => {
        const toSocket = this.getSocketById(to);
        if (toSocket) {
          toSocket.emit('answer', {
            from: userId,
            answer: answer
          });
        }
      });
      
      // ICE Candidate
      socket.on('ice candidate', ({ to, candidate }) => {
        const toSocket = this.getSocketById(to);
        if (toSocket) {
          toSocket.emit('ice candidate', {
            from: userId,
            candidate: candidate
          });
        }
      });
      
      // 控制信号(静音/关闭摄像头)
      socket.on('media control', ({ userId: targetId, control, value }) => {
        const targetSocket = this.getSocketById(targetId);
        if (targetSocket) {
          targetSocket.emit('media control', {
            from: userId,
            control,
            value
          });
        }
      });
      
      // 屏幕共享
      socket.on('screen share start', () => {
        socket.to(currentRoom).emit('screen share started', userId);
      });
      
      socket.on('screen share stop', () => {
        socket.to(currentRoom).emit('screen share stopped', userId);
      });
      
      socket.on('disconnect', () => {
        if (currentRoom) {
          const room = this.rooms.get(currentRoom);
          if (room) {
            room.users.delete(userId);
            
            // 通知其他人
            socket.to(currentRoom).emit('user left', userId);
            
            // 如果是主持人离开,转移主持人权限
            if (userId === room.host && room.users.size > 0) {
              const newHost = Array.from(room.users.keys())[0];
              room.host = newHost;
              this.io.to(currentRoom).emit('host changed', newHost);
            }
            
            // 房间为空时清理
            if (room.users.size === 0) {
              this.rooms.delete(currentRoom);
            }
          }
        }
      });
    });
  }
  
  getSocketById(userId) {
    // 遍历查找用户的socket
    for (const [socketId, socket] of this.io.sockets.sockets) {
      if (socket.userId === userId) {
        return socket;
      }
    }
    return null;
  }
}

8、分布式任务队列监控

适用于消息队列监控、批处理任务进度、CI/CD 流水线等。

js
// 服务端
class TaskQueueMonitor {
  constructor(io, queue) {
    this.io = io;
    this.queue = queue; // Bull/Redis 队列实例
    this.jobSubscriptions = new Map();
    this.metrics = {
      processed: 0,
      failed: 0,
      avgDuration: 0,
      queueLength: 0
    };
  }
  
  async setup() {
    // 监听队列事件
    this.queue.on('global:waiting', (jobId) => {
      this.broadcastMetrics();
      this.io.emit('job waiting', { jobId, timestamp: Date.now() });
    });
    
    this.queue.on('global:active', (jobId) => {
      this.io.emit('job started', { jobId, timestamp: Date.now() });
    });
    
    this.queue.on('global:completed', (jobId, result) => {
      this.metrics.processed++;
      this.broadcastMetrics();
      this.io.emit('job completed', { 
        jobId, 
        result, 
        duration: Date.now() - jobStartTimes.get(jobId)
      });
    });
    
    this.queue.on('global:failed', (jobId, error) => {
      this.metrics.failed++;
      this.broadcastMetrics();
      this.io.emit('job failed', { jobId, error: error.message });
    });
    
    // 定期更新队列长度
    setInterval(async () => {
      const counts = await this.queue.getJobCounts();
      this.metrics.queueLength = counts.waiting + counts.active;
      this.broadcastMetrics();
    }, 2000);
  }
  
  setupSocket(socket) {
    // 订阅特定类型的任务
    socket.on('subscribe job type', (jobType) => {
      if (!this.jobSubscriptions.has(jobType)) {
        this.jobSubscriptions.set(jobType, new Set());
      }
      this.jobSubscriptions.get(jobType).add(socket.id);
    });
    
    // 获取当前所有任务
    socket.on('get all jobs', async (callback) => {
      const waiting = await this.queue.getWaiting();
      const active = await this.queue.getActive();
      const completed = await this.queue.getCompleted();
      const failed = await this.queue.getFailed();
      
      callback({
        waiting: waiting.map(j => ({ id: j.id, data: j.data })),
        active: active.map(j => ({ id: j.id, data: j.data, progress: j.progress() })),
        completed: completed.slice(0, 50),
        failed: failed.slice(0, 50)
      });
    });
    
    // 添加新任务
    socket.on('add job', async (jobData, callback) => {
      try {
        const job = await this.queue.add(jobData.type, jobData.payload, {
          priority: jobData.priority,
          delay: jobData.delay,
          attempts: jobData.retries
        });
        
        callback({ success: true, jobId: job.id });
        
        // 通知订阅者
        const subscribers = this.jobSubscriptions.get(jobData.type);
        if (subscribers) {
          for (const socketId of subscribers) {
            const client = this.io.sockets.sockets.get(socketId);
            if (client) {
              client.emit('new job', {
                id: job.id,
                type: jobData.type,
                data: jobData.payload
              });
            }
          }
        }
      } catch (error) {
        callback({ success: false, error: error.message });
      }
    });
    
    // 取消任务
    socket.on('cancel job', async (jobId, callback) => {
      const job = await this.queue.getJob(jobId);
      if (job) {
        await job.remove();
        callback({ success: true });
        this.io.emit('job